package nivalsoul.kettle.plugins.common;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Maps;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.pentaho.di.core.exception.KettleStepException;
import org.pentaho.di.core.row.RowDataUtil;
import org.pentaho.di.core.row.RowMeta;
import org.pentaho.di.trans.Trans;

import java.util.Arrays;
import java.util.Map;
import java.util.Properties;

public class KafkaOptPlugin extends CommonStepRunBase {

	String topic = "";
	Map<String, Object> configs;

	@Override
	public boolean run() throws Exception {
		Object[] r = commonStep.getRow(); 
        if (commonStep.first) {
        	data.outputRowMeta = new RowMeta();
        	meta.getFields(data.outputRowMeta, commonStep.getStepname(), null, null, 
        			commonStep, commonStep.getRepository(), commonStep.getMetaStore());
            commonStep.first = false;
            init();
        }
        //
        String roleType = configInfo.getString("roleType");
        topic = commonStep.environmentSubstitute(configInfo.getString("topic"));
        configs = JSON.parseObject(commonStep.environmentSubstitute(configInfo.getString("configs")));
        if("consumer".equals(roleType)) {
			consumer();
        }else if("producer".equals(roleType)) {
			producer();
        }else {
        	return doNothing(r);
        }
        
        commonStep.setOutputDone();
        return false;
	}

	private void consumer() throws KettleStepException {
		JSONArray outputFields = meta.getOutputFields();
		if(outputFields==null || outputFields.size()==0) {
			outputFields = new JSONArray();
			JSONObject field = new JSONObject();
			field.put("name", "offset");
			field.put("type", "Integer");
			outputFields.add(field);
			field = new JSONObject();
			field.put("name", "key");
			field.put("type", "String");
			outputFields.add(field);
			field = new JSONObject();
			field.put("name", "value");
			field.put("type", "String");
			outputFields.add(field);
			meta.setOutputFields(outputFields);
			meta.getFields(data.outputRowMeta, commonStep.getStepname(), null, null, 
					commonStep, commonStep.getRepository(), commonStep.getMetaStore());
		}

		KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
		consumer.subscribe(Arrays.asList(topic));
		while (!commonStep.getTrans().isStopped()) {
			ConsumerRecords<String, String> records = consumer.poll(100);
			for (ConsumerRecord<String, String> record : records){
				Object[] outputRowData = RowDataUtil.allocateRowData( data.outputRowMeta.size() );
				outputRowData[0] = record.offset();
				outputRowData[1] = record.key();
				outputRowData[2] = record.value();
				commonStep.putRow(data.outputRowMeta, outputRowData);
			}
		}
		consumer.close();
	}

	private void producer() throws KettleStepException {
		Properties props = new Properties();
		props.put("bootstrap.servers", "localhost:9092");
		props.put("acks", "all");
		props.put("delivery.timeout.ms", 30000);
		props.put("batch.size", 16384);
		props.put("linger.ms", 1);
		props.put("buffer.memory", 33554432);
		props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

		Producer<String, String> producer = new KafkaProducer<>(props);
		for (int i = 0; i < 100; i++) {
			producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
		}

		producer.close();
	}
	
	private boolean doNothing(Object[] r) throws Exception, KettleStepException {
		if (r == null) {
		    end();
		    commonStep.setOutputDone();
		    return false;
		}
		commonStep.putRow(data.outputRowMeta, r);
		return true;
	}
	
}
